Demo code for common libraries#

Notably, the intake, fsspec, dask, and xarray libraries. These libraries are used in most of the HyTEST notebooks and understanding how they work together will help understand the code more.

Author: Andrew Laws - USGS Web Informatics and Mapping (WIM)

Date: 3/20/2023

Focus: intake, fsspec, dask, and xarray libraries use in climate and forcings big data

# library imports
import fsspec
import hvplot.xarray
import intake
import os
import warnings
import rioxarray
import dask

from dask.distributed import LocalCluster, Client
from pygeohydro import pygeohydro

import xarray as xr
import geopandas as gpd
import pandas as pd
import geoviews as gv
import dask.dataframe as dd

warnings.filterwarnings('ignore')

Dask is a super useful Python package for parallel and lazy computing.#

When working with a large dataset that is multiple terrabytes in size such as CONUS404, Dask allows you to perform lazy operations, meaning the whole dataset is not read into memory (RAM) all at once but only once you’re ready for it. A prime example is work within HyTEST where we lazily ready in CONUS404, subset the data to certain variables and a limited geographic extend and then load it into memory. For a more in-depth dive and before you start to use it in-depth, make sure you read through the Dask documentation.

Another huge benefit of this: you can avoid the annoying step of downloading data to a local folder, instead just using a web service, saving time. This means if the base data is updated, you don’t have to download the dataset again and it is automatically available for your notebook.

Starting up dask on HPC or Nebari using a helper function#

def configure_cluster(machine):
    ''' Helper function to configure cluster
    '''
    if machine == 'denali':
        from dask.distributed import LocalCluster, Client
        cluster = LocalCluster(threads_per_worker=1)
        client = Client(cluster)
    
    elif machine == 'tallgrass':
        from dask.distributed import Client
        from dask_jobqueue import SLURMCluster
        cluster = SLURMCluster(queue='cpu', cores=1, interface='ib0',
                               job_extra=['--nodes=1', '--ntasks-per-node=1', '--cpus-per-task=1'],
                               memory='6GB')
        cluster.adapt(maximum_jobs=30)
        client = Client(cluster)
        
    elif machine == 'local':
        import os
        import warnings
        from dask.distributed import LocalCluster, Client
        warnings.warn("Running locally can result in costly data transfers!\n")
        n_cores = os.cpu_count() # set to match your machine
        cluster = LocalCluster(threads_per_worker=n_cores)
        client = Client(cluster)
        
    elif machine in ['esip-qhub-gateway-v0.4']:   
        import sys, os
        sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
        import ebdpy as ebd
        aws_profile = 'esip-qhub'
        ebd.set_credentials(profile=aws_profile)

        aws_region = 'us-west-2'
        endpoint = f's3.{aws_region}.amazonaws.com'
        ebd.set_credentials(profile=aws_profile, region=aws_region, endpoint=endpoint)
        worker_max = 30
        client,cluster = ebd.start_dask_cluster(profile=aws_profile, worker_max=worker_max, 
                                              region=aws_region, use_existing_cluster=True,
                                              adaptive_scaling=False, wait_for_cluster=False, 
                                              worker_profile='Medium Worker', propagate_env=True)
        
    return client, cluster

# if-else to determine HPC or Nebari
if 'SLURM_CLUSTER_NAME' in os.environ: #USGS HPC use SLURM CLUSTER to handle jobs, otherwise...
    machine = os.environ['SLURM_CLUSTER_NAME']
    cluster = configure_cluster(machine)
else:  # use the Nebari machine
    machine = 'esip-qhub-gateway-v0.4'
    client, cluster = configure_cluster(machine)
Region: us-west-2
No Cluster running.
Starting new cluster.
{}
Setting Cluster Environment Variable AWS_DEFAULT_REGION us-west-2
Setting Fixed Scaling workers=30
Reconnect client to clear cache
client.dashboard_link (for new browser tab/window or dashboard searchbar in Jupyterhub):
https://nebari.esipfed.org/gateway/clusters/dev.42594f79b9be466e9405658bd5cd852b/status
Propagating environment variables to workers
Using environment: users/users-pangeofu

You can also use Dask on your personal computer. This bit of Dask documentation explains how easy this is to set up.

# cluster = LocalCluster()
# client = Client(cluster)

With HyTEST, we use intake catalogs in our code to making data I/O more uniform.#

If we change where a dataset gets imported from, we only have to change it in one place rather than in each notebook. They can also be nested as you’ll see next.

Intake readthedocs site

Example of HyTEST catalogs

Note: Select datasets that end in “onprem” if running on Denali/Tallgrass HPC or cloud data if working on QHub or local.

url = 'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml'
cat = intake.open_catalog(url)
list(cat)
['conus404-drb-eval-tutorial-catalog',
 'nhm-v1.0-daymet-catalog',
 'nhm-v1.1-c404-bc-catalog',
 'nhm-v1.1-gridmet-catalog',
 'conus404-catalog',
 'nwis-streamflow-usgs-gages-onprem',
 'nwis-streamflow-usgs-gages-cloud',
 'nwm21-streamflow-usgs-gages-onprem',
 'nwm21-streamflow-usgs-gages-cloud',
 'nwm21-streamflow-cloud',
 'nwm21-scores',
 'lcmap-cloud',
 'rechunking-tutorial-cloud']

Notice the ‘conus404-drb-eval-tutorial-catalog’? That is a nested catalog for data we use in a specific tutorial series and can be accessed like this.

nested_cat = cat['conus404-drb-eval-tutorial-catalog']
list(nested_cat)
['conus404-drb-OSN',
 'prism-drb-OSN',
 'ceres-drb-OSN',
 'crn-drb-OSN',
 'hcn-drb-OSN']

And if you want to see the details about the dataset

nested_cat['conus404-drb-OSN']
conus404-drb-OSN:
  args:
    storage_options:
      anon: true
      client_kwargs:
        endpoint_url: https://renc.osn.xsede.org
      requester_pays: false
    urlpath: s3://rsignellbucket2/hytest/tutorials/conus404_model_evaluation/c404_drb.nc
    xarray_kwargs:
      decode_coords: all
  description: CONUS404 Delaware River Basin subset, 40 years of monthly data for
    CONUS404 model evaluation
  driver: intake_xarray.netcdf.NetCDFSource
  metadata:
    catalog_dir: https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/subcatalogs

The other handy bit about intake is that you can open a dataset as a Dask data object easily (though the base class will be an xarray dataset). This is done lazily. But why the xarray dataset? From this read about the Dask ecosystem: “xarray: Wraps Dask Array, offering the same scalability, but with axis labels which add convenience when dealing with complex datasets.”

c404_drb = nested_cat['conus404-drb-OSN'].to_dask()
c404_drb
<xarray.Dataset>
Dimensions:      (time: 504, y: 105, x: 47)
Coordinates:
    lon          (y, x) float32 ...
  * x            (x) float64 1.768e+06 1.772e+06 ... 1.948e+06 1.952e+06
  * y            (y) float64 2e+05 2.04e+05 2.08e+05 ... 6.12e+05 6.16e+05
    lat          (y, x) float32 ...
  * time         (time) datetime64[ns] 1979-10-31 1979-11-30 ... 2021-09-30
    crs          int32 ...
Data variables:
    TK           (time, y, x) float32 ...
    RNET         (time, y, x) float32 ...
    PREC_ACC_NC  (time, y, x) float32 ...
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...

What is xarray?#

From the Xarray website: “Xarray introduces labels in the form of dimensions, coordinates and attributes on top of raw NumPy-like multidimensional arrays, which allows for a more intuitive, more concise, and less error-prone developer experience.”

If you’ve used numpy and pandas before, the API will be relatively familiar. As mentioned above, since it wraps a Dask array, it allows computation on large datasets in parallel. It can also take in many cloud and local file types. See this Xarray documentation about that. As with Dask, an upfront time investment in understanding Xarray will pay long-term efficiency rewards.

But what if you don’t want to use intake catalogs for a bunch of datasets you don’t maintain?

fsspec allows a user to connect with many different filesystems for querying, reading, and writing data.#

Want data from an AWS S3 bucket? fsspec has you covered, whether it requires credentials (requester pays and no anonymous read) or not (anonymous reads). Local files? Yep. Open data portals using something such as THREDDS Data Server? Still good. For a more in-depth look at fsspec, read through the fsspec documentation.

An additional benefit is that many libraries (dask, xarray, and pandas) already use fsspec in the background for reading and writing data. Next, you’ll see a couple examples of calling in data using fsspec directly and indirectly (through xarray), as well as incorporating dask.

First up, some data is only available through older file transfer protocal (FTP) methods. Using fsspec’s FTPFileSystem object.

from fsspec.implementations.ftp import FTPFileSystem

One thing to note: FTP server calls typically have a timeout, meaning if you take too long after instantiating the connecting it will disconnect. All you have to do is reconnect by running the code again (next cell). In the next few cells, we are reading in Climate Reference Network station data over an FTP connection.

First, create a FTP file system connection.

fs = FTPFileSystem("ftp.ncei.noaa.gov")
fs.ls("/pub/data/uscrn/products/")
[{'modify': '20230101154006',
  'perm': 'flcdmpe',
  'type': 'directory',
  'unique': '29U7B0062A895',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/daily01',
  'size': 0},
 {'modify': '20101215215611',
  'perm': 'flcdmpe',
  'type': 'directory',
  'unique': '29U7B83636414',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/hourly01',
  'size': 0},
 {'modify': '20230101021741',
  'perm': 'flcdmpe',
  'type': 'directory',
  'unique': '29U7C0074877B',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/hourly02',
  'size': 0},
 {'modify': '20220302211849',
  'perm': 'flcdmpe',
  'type': 'directory',
  'unique': '29U7C80B4EA9E',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/monthly01',
  'size': 0},
 {'modify': '20170705141338',
  'perm': 'flcdmpe',
  'type': 'directory',
  'unique': '29U2CB9B4F',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/previous_docs',
  'size': 0},
 {'modify': '20120613145107',
  'perm': 'flcdmpe',
  'type': 'directory',
  'unique': '29U807515E5',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/snapshots',
  'size': 0},
 {'modify': '20211219020507',
  'perm': 'flcdmpe',
  'type': 'directory',
  'unique': '29U1007DBEBD',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/soil01',
  'size': 0},
 {'modify': '20151005045036',
  'perm': 'flcdmpe',
  'type': 'directory',
  'unique': '29U280984611',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/soilsip01',
  'size': 0},
 {'modify': '20230101015409',
  'perm': 'flcdmpe',
  'type': 'directory',
  'unique': '29U3007AB113',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/subhourly01',
  'size': 0},
 {'modify': '20170630183509',
  'perm': 'adfrw',
  'size': 4242,
  'type': 'file',
  'unique': '29U1E0110F37C',
  'unix.group': '50',
  'unix.mode': '0664',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/DATASET-STATUS.txt'},
 {'modify': '20130715195333',
  'perm': 'adfrw',
  'size': 855,
  'type': 'file',
  'unique': '29U1E0110F37D',
  'unix.group': '50',
  'unix.mode': '0664',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/rss.xml'},
 {'modify': '20210830171807',
  'perm': 'fle',
  'type': 'directory',
  'unique': '29U4D0063872E',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/.heat01',
  'size': 0},
 {'modify': '20211123170434',
  'perm': 'fle',
  'type': 'directory',
  'unique': '29U18021845E',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/heat01',
  'size': 0},
 {'modify': '20211217212112',
  'perm': 'fle',
  'type': 'directory',
  'unique': '29U4C8157522C',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/drought01',
  'size': 0},
 {'modify': '20211206050009',
  'perm': 'adfr',
  'size': 36542,
  'type': 'file',
  'unique': '29U1E0402940C',
  'unix.group': '50',
  'unix.mode': '0664',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/stations.tsv'},
 {'modify': '20211206232838',
  'perm': 'fle',
  'type': 'directory',
  'unique': '29U5981C8AF35',
  'unix.group': '50',
  'unix.mode': '02775',
  'unix.owner': '14',
  'name': '/pub/data/uscrn/products/soil',
  'size': 0}]

Since the file type is tab-separated values (tsv), we will use the pd.read_table function to create a Dataframe

uscrn_all = pd.read_table(fs.open("/pub/data/uscrn/products/stations.tsv")) 
uscrn_all.head()
WBAN COUNTRY STATE LOCATION VECTOR NAME LATITUDE LONGITUDE ELEVATION STATUS COMMISSIONING CLOSING OPERATION PAIRING NETWORK STATION_ID
0 03047 US TX Monahans 6 ENE Sandhills State Park 31.62 -102.80 2724 Commissioned 2004-01-11 19:00:00.0 NaN Operational NaN USCRN 1019
1 03048 US NM Socorro 20 N Sevilleta National Wildlife Refuge (LTER Site) 34.35 -106.88 4847 Commissioned 2004-01-11 19:00:00.0 NaN Operational NaN USCRN 1020
2 03054 US TX Muleshoe 19 S Muleshoe National Wildlife Refuge (Headquarter... 33.95 -102.77 3742 Commissioned 2004-04-22 20:00:00.0 NaN Operational NaN USCRN 1067
3 03055 US OK Goodwell 2 E OK Panhandle Research & Extn. Center (Native ... 36.59 -101.59 3266 Commissioned 2004-04-22 20:00:00.0 NaN Operational NaN USCRN 1068
4 03060 US CO Montrose 11 ENE Black Canyon of the Gunnison National Park (Ve... 38.54 -107.69 8402 Commissioned 2004-09-07 20:00:00.0 NaN Operational NaN USCRN 1109

What about reading from a permissioned S3 bucket? Lets find the URL for a dataset from the intake catalog. Note: this requires you to have AWS credentials.

If you are unsure of what permissioned/requester pays bucket is, read this resource.

conus404_cat = cat["conus404-catalog"]
conus404_cat['conus404-hourly-cloud']
conus404-hourly-cloud:
  args:
    consolidated: true
    storage_options:
      requester_pays: true
    urlpath: s3://nhgf-development/conus404/conus404_hourly_202209.zarr
  description: 'CONUS404 Hydro Variable subset, 40 years of hourly values. These files
    were created wrfout model output files (see ScienceBase data release for more
    details: https://www.sciencebase.gov/catalog/item/6372cd09d34ed907bf6c6ab1). This
    dataset is stored on AWS S3 cloud storage in a requester-pays bucket. You can
    work with this data for free if your workflow is running in the us-west-2 region,
    but you will be charged according to AWS S3 pricing (https://aws.amazon.com/s3/pricing/)
    to read the data into a workflow running outside of the cloud or in a different
    AWS cloud region.'
  driver: intake_xarray.xzarr.ZarrSource
  metadata:
    catalog_dir: https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/subcatalogs

Using the urlpath in xarray, we can read in the dataset using saved AWS credentials and the fsspec.get_mapper() method to pass the correct parameters to xarray.open_zarr().

c404_hourly_url = "s3://nhgf-development/conus404/conus404_hourly_202209.zarr"

fs_gm = fsspec.get_mapper(c404_hourly_url, # given the url, fsspec will know it is speaking to an S3 file system
                          anon=False,
                          requester_pays=True, # our S3 credentials will be charged
                          client_kwargs={'region_name':'us-west-2'})

# open dataset
c404_hourly = xr.open_zarr(fs_gm)
c404_hourly
<xarray.Dataset>
Dimensions:         (time: 368064, y: 1015, x: 1367, bottom_top_stag: 51,
                     bottom_top: 50, soil_layers_stag: 4, x_stag: 1368,
                     y_stag: 1016, snow_layers_stag: 3, snso_layers_stag: 7)
Coordinates:
    lat             (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lat_u           (y, x_stag) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lat_v           (y_stag, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon             (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon_u           (y, x_stag) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon_v           (y_stag, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
  * time            (time) datetime64[ns] 1979-10-01 ... 2021-09-25T23:00:00
  * x               (x) float64 -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y               (y) float64 -2.028e+06 -2.024e+06 ... 2.024e+06 2.028e+06
Dimensions without coordinates: bottom_top_stag, bottom_top, soil_layers_stag,
                                x_stag, y_stag, snow_layers_stag,
                                snso_layers_stag
Data variables: (12/157)
    ACDEWC          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACDRIPR         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACDRIPS         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACECAN          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACEDIR          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACETLSM         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ...              ...
    ZNU             (bottom_top) float32 dask.array<chunksize=(50,), meta=np.ndarray>
    ZNW             (bottom_top_stag) float32 dask.array<chunksize=(51,), meta=np.ndarray>
    ZS              (soil_layers_stag) float32 dask.array<chunksize=(4,), meta=np.ndarray>
    ZSNSO           (time, snso_layers_stag, y, x) float32 dask.array<chunksize=(144, 7, 175, 175), meta=np.ndarray>
    ZWT             (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    crs             int32 ...
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...

Or you can open the dataset lazily with Dask by setting the parameter chunks={}

# open dataset lazily 
# this happens by setting chunks={}
# though you can specify chunks where applicable
c404_hourly_dask = xr.open_zarr(fs_gm, chunks={})
c404_hourly_dask
<xarray.Dataset>
Dimensions:         (time: 368064, y: 1015, x: 1367, bottom_top_stag: 51,
                     bottom_top: 50, soil_layers_stag: 4, x_stag: 1368,
                     y_stag: 1016, snow_layers_stag: 3, snso_layers_stag: 7)
Coordinates:
    lat             (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lat_u           (y, x_stag) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lat_v           (y_stag, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon             (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon_u           (y, x_stag) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon_v           (y_stag, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
  * time            (time) datetime64[ns] 1979-10-01 ... 2021-09-25T23:00:00
  * x               (x) float64 -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y               (y) float64 -2.028e+06 -2.024e+06 ... 2.024e+06 2.028e+06
Dimensions without coordinates: bottom_top_stag, bottom_top, soil_layers_stag,
                                x_stag, y_stag, snow_layers_stag,
                                snso_layers_stag
Data variables: (12/157)
    ACDEWC          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACDRIPR         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACDRIPS         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACECAN          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACEDIR          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACETLSM         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ...              ...
    ZNU             (bottom_top) float32 dask.array<chunksize=(50,), meta=np.ndarray>
    ZNW             (bottom_top_stag) float32 dask.array<chunksize=(51,), meta=np.ndarray>
    ZS              (soil_layers_stag) float32 dask.array<chunksize=(4,), meta=np.ndarray>
    ZSNSO           (time, snso_layers_stag, y, x) float32 dask.array<chunksize=(144, 7, 175, 175), meta=np.ndarray>
    ZWT             (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    crs             int32 ...
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...

The datasets look the same but the second takes up less space in memory. Now to show an example of lazy computation using radiation variables to calculate a made up variable, FAKERAD. Here are the equations: \begin{equation}FAKERAD = DNB - UPB\end{equation} \begin{equation}DNB = ACSWDNB - ACLWDNB\end{equation} \begin{equation}UPB = ACSWUPB - ACLWUPB\end{equation}

c404_hourly_dask_sub = c404_hourly_dask[["ACSWDNB", "ACSWUPB", "ACLWDNB", "ACLWUPB"]]
c404_hourly_dask_sub = c404_hourly_dask_sub.sel(time=slice('1979-10-01T00:00:00.000000000', '1979-10-01T00:00:00.000000000')) #subset to single time step
c404_hourly_dask_sub
<xarray.Dataset>
Dimensions:  (time: 1, y: 1015, x: 1367)
Coordinates:
    lat      (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon      (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
  * time     (time) datetime64[ns] 1979-10-01
  * x        (x) float64 -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y        (y) float64 -2.028e+06 -2.024e+06 -2.02e+06 ... 2.024e+06 2.028e+06
Data variables:
    ACSWDNB  (time, y, x) float32 dask.array<chunksize=(1, 175, 175), meta=np.ndarray>
    ACSWUPB  (time, y, x) float32 dask.array<chunksize=(1, 175, 175), meta=np.ndarray>
    ACLWDNB  (time, y, x) float32 dask.array<chunksize=(1, 175, 175), meta=np.ndarray>
    ACLWUPB  (time, y, x) float32 dask.array<chunksize=(1, 175, 175), meta=np.ndarray>
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...

First, calculate DNB and UPB

c404_hourly_dask_sub["DNB"] = c404_hourly_dask_sub['ACSWDNB'] - c404_hourly_dask_sub['ACLWDNB']
c404_hourly_dask_sub['UPB'] = c404_hourly_dask_sub['ACSWUPB'] - c404_hourly_dask_sub['ACLWUPB']

Then calculate FAKERAD.

c404_hourly_dask_sub['FAKERAD'] = c404_hourly_dask_sub['DNB'] - c404_hourly_dask_sub['UPB']

This has all happened so fast! But, has anything actually happened? Dask has been keeping track of the operations and won’t run them all until the compute() method is passed. The execution will take longer time than the last couple of cells but will be much faster than runner all of these operations on an in-memory dataset.

c404_hourly_memory_sub = c404_hourly_dask_sub.compute()
c404_hourly_memory_sub
<xarray.Dataset>
Dimensions:  (time: 1, y: 1015, x: 1367)
Coordinates:
    lat      (y, x) float32 17.65 17.66 17.67 17.68 ... 51.74 51.73 51.71 51.69
    lon      (y, x) float32 -122.6 -122.5 -122.5 -122.5 ... -57.17 -57.12 -57.07
  * time     (time) datetime64[ns] 1979-10-01
  * x        (x) float64 -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y        (y) float64 -2.028e+06 -2.024e+06 -2.02e+06 ... 2.024e+06 2.028e+06
Data variables:
    ACSWDNB  (time, y, x) float32 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0
    ACSWUPB  (time, y, x) float32 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0
    ACLWDNB  (time, y, x) float32 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0
    ACLWUPB  (time, y, x) float32 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0
    DNB      (time, y, x) float32 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0
    UPB      (time, y, x) float32 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0
    FAKERAD  (time, y, x) float32 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...

If you are working with large tabular data, Dask Dataframes performs the same way and is modeled after pandas dataframes. The .compute() graphs can be visualized for these and more information can be found here.

c404_hourly_dask_sub['ACSWDNB'].data.visualize()
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[19], line 1
----> 1 c404_hourly_dask_sub['ACSWDNB'].data.visualize()

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/dask/base.py:240, in DaskMethodsMixin.visualize(self, filename, format, optimize_graph, **kwargs)
    196 def visualize(self, filename="mydask", format=None, optimize_graph=False, **kwargs):
    197     """Render the computation of this object's task graph using graphviz.
    198 
    199     Requires ``graphviz`` to be installed.
   (...)
    238     https://docs.dask.org/en/latest/optimize.html
    239     """
--> 240     return visualize(
    241         self,
    242         filename=filename,
    243         format=format,
    244         optimize_graph=optimize_graph,
    245         **kwargs,
    246     )

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/dask/base.py:796, in visualize(filename, traverse, optimize_graph, maxval, engine, *args, **kwargs)
    793 if engine == "graphviz":
    794     from dask.dot import dot_graph
--> 796     return dot_graph(dsk, filename=filename, **kwargs)
    797 elif engine in ("cytoscape", "ipycytoscape"):
    798     from dask.dot import cytoscape_graph

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/dask/dot.py:274, in dot_graph(dsk, filename, format, **kwargs)
    236 """
    237 Render a task graph using dot.
    238 
   (...)
    271 dask.dot.to_graphviz
    272 """
    273 g = to_graphviz(dsk, **kwargs)
--> 274 return graphviz_to_file(g, filename, format)

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/dask/dot.py:291, in graphviz_to_file(g, filename, format)
    288 if format is None:
    289     format = "png"
--> 291 data = g.pipe(format=format)
    292 if not data:
    293     raise RuntimeError(
    294         "Graphviz failed to properly produce an image. "
    295         "This probably means your installation of graphviz "
   (...)
    298         "issues/485 for more information."
    299     )

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/graphviz/piping.py:104, in Pipe.pipe(self, format, renderer, formatter, neato_no_op, quiet, engine, encoding)
     55 def pipe(self,
     56          format: typing.Optional[str] = None,
     57          renderer: typing.Optional[str] = None,
   (...)
     61          engine: typing.Optional[str] = None,
     62          encoding: typing.Optional[str] = None) -> typing.Union[bytes, str]:
     63     """Return the source piped through the Graphviz layout command.
     64 
     65     Args:
   (...)
    102         '<?xml version='
    103     """
--> 104     return self._pipe_legacy(format,
    105                              renderer=renderer,
    106                              formatter=formatter,
    107                              neato_no_op=neato_no_op,
    108                              quiet=quiet,
    109                              engine=engine,
    110                              encoding=encoding)

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/graphviz/_tools.py:171, in deprecate_positional_args.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    162     wanted = ', '.join(f'{name}={value!r}'
    163                        for name, value in deprecated.items())
    164     warnings.warn(f'The signature of {func.__name__} will be reduced'
    165                   f' to {supported_number} positional args'
    166                   f' {list(supported)}: pass {wanted}'
    167                   ' as keyword arg(s)',
    168                   stacklevel=stacklevel,
    169                   category=category)
--> 171 return func(*args, **kwargs)

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/graphviz/piping.py:121, in Pipe._pipe_legacy(self, format, renderer, formatter, neato_no_op, quiet, engine, encoding)
    112 @_tools.deprecate_positional_args(supported_number=2)
    113 def _pipe_legacy(self,
    114                  format: typing.Optional[str] = None,
   (...)
    119                  engine: typing.Optional[str] = None,
    120                  encoding: typing.Optional[str] = None) -> typing.Union[bytes, str]:
--> 121     return self._pipe_future(format,
    122                              renderer=renderer,
    123                              formatter=formatter,
    124                              neato_no_op=neato_no_op,
    125                              quiet=quiet,
    126                              engine=engine,
    127                              encoding=encoding)

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/graphviz/piping.py:161, in Pipe._pipe_future(self, format, renderer, formatter, neato_no_op, quiet, engine, encoding)
    159     else:
    160         return raw.decode(encoding)
--> 161 return self._pipe_lines(*args, input_encoding=self.encoding, **kwargs)

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/graphviz/backend/piping.py:161, in pipe_lines(engine, format, input_lines, input_encoding, renderer, formatter, neato_no_op, quiet)
    155 cmd = dot_command.command(engine, format,
    156                           renderer=renderer,
    157                           formatter=formatter,
    158                           neato_no_op=neato_no_op)
    159 kwargs = {'input_lines': (line.encode(input_encoding) for line in input_lines)}
--> 161 proc = execute.run_check(cmd, capture_output=True, quiet=quiet, **kwargs)
    162 return proc.stdout

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/graphviz/backend/execute.py:79, in run_check(cmd, input_lines, encoding, quiet, **kwargs)
     77     if kwargs.pop('capture_output'):
     78         kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE
---> 79     proc = _run_input_lines(cmd, input_lines, kwargs=kwargs)
     80 else:
     81     proc = subprocess.run(cmd, **kwargs)

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/site-packages/graphviz/backend/execute.py:105, in _run_input_lines(cmd, input_lines, kwargs)
    102 for line in input_lines:
    103     stdin_write(line)
--> 105 stdout, stderr = popen.communicate()
    106 return subprocess.CompletedProcess(popen.args, popen.returncode,
    107                                    stdout=stdout, stderr=stderr)

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/subprocess.py:1154, in Popen.communicate(self, input, timeout)
   1151     endtime = None
   1153 try:
-> 1154     stdout, stderr = self._communicate(input, endtime, timeout)
   1155 except KeyboardInterrupt:
   1156     # https://bugs.python.org/issue25942
   1157     # See the detailed comment in .wait().
   1158     if timeout is not None:

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/subprocess.py:2005, in Popen._communicate(self, input, endtime, orig_timeout)
   1998     self._check_timeout(endtime, orig_timeout,
   1999                         stdout, stderr,
   2000                         skip_check_and_raise=True)
   2001     raise RuntimeError(  # Impossible :)
   2002         '_check_timeout(..., skip_check_and_raise=True) '
   2003         'failed to raise TimeoutExpired.')
-> 2005 ready = selector.select(timeout)
   2006 self._check_timeout(endtime, orig_timeout, stdout, stderr)
   2008 # XXX Rewrite these to use non-blocking I/O on the file
   2009 # objects; they are no longer using C stdio!

File /home/conda/users/ef214d1986de9477d5223419ba4838d76e9839b19e4fb02c9e208c97cab5a61b-20230329-131713-443303-165-pangeofu/lib/python3.10/selectors.py:416, in _PollLikeSelector.select(self, timeout)
    414 ready = []
    415 try:
--> 416     fd_event_list = self._selector.poll(timeout)
    417 except InterruptedError:
    418     return ready

KeyboardInterrupt: 

Finally, here are some examples of calling in some datasets from various open access sources.

OpenDAP data#

Now, looking at CERES-EBAF, called in from an OpenDAP source.

# ceres-ebaf url
ceres_url = "https://opendap.larc.nasa.gov/opendap/CERES/EBAF/Edition4.1/CERES_EBAF_Edition4.1_200003-202111.nc"

# bring in ceres-ebaf
ceres = xr.open_dataset(ceres_url, decode_coords="all")
ceres

Data on a THREDD server#

# open daymet lazily 
# this happens by setting chunks={}
# though you can specify chunks where applicable
daymet = xr.open_dataset("https://thredds.daac.ornl.gov/thredds/dodsC/daymet-v4-agg/na.ncml", chunks={})
daymet

Clipping xarray datasets to a bounding box#

Here are two possible ways to clip to a bounding box and when each might suit you best. Why a bounding box and not an exact clip? If storing an intermediate dataset, this makes the data smaller as well as giving flexibility to exact area extraction or only including cells inside of the area of interest.

The first is using the xarray index methods. This is beneficial because it doesn’t require using extra libraries but there have been isues when the spatial resolution of the dataset is very large and the geographical area is small. In some cases, it will drop some grid cells compared to the second method.

from pygeohydro import WBD
from cartopy import crs as ccrs
# bring in boundaries of DRB and create single polygon
drb = WBD("huc6", outfields=["huc6", "name"]).byids("huc6", ["020401", "020402"])

# create a column where all entries have the same value
drb["name"] = "DRB"

# dissolve by that column
drb = drb.dissolve(by="name")

# set CRS to match ds
drb = drb.iloc[[0]].to_crs(ccrs.LambertConformal())

# tuple of bounding box
drb_bbox = list(drb.total_bounds)

drb_bbox
daymet_sel = daymet.sel(x=slice(drb_bbox[1],drb_bbox[3]), y=slice(drb_bbox[2],drb_bbox[0]))
daymet_sel

In the above, the indexing returned a subset along the x dimension but nothing along the y due to underlying CRS/indexing conflicts.

The second is the rioxarray.clip_box method. rioxarray is used to apply rasterio to xarray and adds a rio accessor to xarray objects and documentation can be found here about rioxarray.. This method does require an additional library but does ensure that all grid cells that are inside the bounding box are included.

import rioxarray

# add crs to dataset
daymet.rio.write_crs(ccrs.LambertConformal(), inplace=True)

# drop time_bnds, otherwise it will throw an error in the clip_box
daymet = daymet.drop(["time_bnds"])

#clip to bbox
daymet_bbox = daymet.rio.clip_box(minx=drb_bbox[0],
                            miny=drb_bbox[1],
                            maxx=drb_bbox[2],
                            maxy=drb_bbox[3],
                            crs=ccrs.LambertConformal())

daymet_bbox

As can be seen, the indexing works great but does have one less grid cell (see y dim)

print("Index select:\n", daymet_sel.dims)
print("Rioxarray:\n", daymet_bbox.dims)

Finally, it is always important to shutdown the client and cluster.

client.close(); cluster.shutdown()